package kafka2

import (
	"gitee.com/chejiangyi/bsfgo/core/base2"
	"github.com/IBM/sarama"
)

type KafkaProducer struct {
	SyncProducer sarama.SyncProducer
}

// SendMessage 发送消息到指定主题
func (k *KafkaProducer) SendMessage(topic string, key string, value string) (partition int32, offset int64) {
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.StringEncoder(value),
	}
	partition, offset, err := k.SyncProducer.SendMessage(msg)
	if err != nil {
		panic(base2.NewBsfError2("消息发送出错", err))
	}
	return partition, offset
}

func (k *KafkaProducer) Close() {
	if k.SyncProducer != nil {
		k.SyncProducer.Close()
	}
}
